package com.rabbitmq.spring.api;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import static com.rabbitmq.common.constants.CommonConstants.*;


/**
 * @author
 * @Describe direct交换机，手动确认消息
 * @date
 */
@Component
@Slf4j(topic = "SpringConsumerDirectQueueAckManual")
public class SpringConsumerDirectQueueManualAck {

    /************ 1.通过手动设置消息监听器来处理消息手动确认,此处只需声明队列，交换机并进行绑定开始 ************
     * 在监听器 MessageListenerConfig中对需要监听的队列进行设置，后续所有对该队列的消息消费都统一走设置的消息监听消费即ManualAckMessageListener
     */
    /**
     * 手动声明队列bean
     *
     * @return
     */
    @Bean
    public org.springframework.amqp.core.Queue publishDirectQueueManualAck() {
        return new org.springframework.amqp.core.Queue(SPRING_QUEUE_NAME_PREFIX + "direct_publishDirectQueueManualAck");
    }

    /**
     * 手动声明交换机bean
     *
     * @return
     */
    @Bean
    public DirectExchange publishDirectExchangeManualAck() {
        return new DirectExchange(SPRING_EXCHANGE_NAME_PREFIX + "direct_publishDirectExchangeManualAck");
    }

    /**
     * 手动声明binding bean
     *
     * @return
     */
    @Bean
    public Binding bindQueue2ExchangeManualAck(org.springframework.amqp.core.Queue publishDirectQueueManualAck,
                                               DirectExchange publishDirectExchangeManualAck) {
        return BindingBuilder.bind(publishDirectQueueManualAck).to(publishDirectExchangeManualAck).with("kinson_routingkey_info");
    }

    /**
     * 自定义消费SPRING_QUEUE_NAME_PREFIX + "direct_publishDirectQueueManualAck"队列消息
     */
    /*@RabbitListener(queues = SPRING_QUEUE_NAME_PREFIX + "direct_publishDirectQueueManualAck")
    public void receiveDirectQueueManualAck(String sendMsg, Channel channel, Message message) throws UnsupportedEncodingException, IOException {
        MessageProperties messageProperties = message.getMessageProperties();
        log.info(" ===== receiveDirectQueueManualAck deliveryTag is {} ===== ", messageProperties.getDeliveryTag());
        log.info(" ===== receiveDirectQueueManualAck sendMsg is {} ===== ", sendMsg);
        log.info(" ===== receiveDirectQueueManualAck msg is {} ===== ", new String(message.getBody(), UTF_8));
        log.info(" ===== receiveDirectQueueManualAck headers is {} ===== ", messageProperties.getHeaders());
        log.info(" ===== receiveDirectQueueManualAck receivedExchange is {} ===== ", messageProperties.getReceivedExchange());
        log.info(" ===== receiveDirectQueueManualAck receivedRoutingKey is {} ===== ", messageProperties.getReceivedRoutingKey());
        log.info(" ===== receiveDirectQueueManualAck consumerQueue is {} ===== ", messageProperties.getConsumerQueue());

        channel.basicAck(messageProperties.getDeliveryTag(), false);

    }*/
    /************ 1.通过手动设置消息监听器来处理消息手动确认 结束 ************/


    /************ 2.通过配置文件全局设置确认为手动确认 开始 ************/
    /**
     * 在注解里进行队列绑定交换机和路由信息
     *
     * @param message
     * @throws UnsupportedEncodingException
     */
    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = SPRING_QUEUE_NAME_PREFIX + "direct" + "_publishDirectQueueGlobalManualAck"), // 声明队列
            exchange = @Exchange(name = SPRING_EXCHANGE_NAME_PREFIX + "direct" + "_publishDirectExchangeGlobalManualAck", type = "direct"), // 交换机
            key = {"kinson_routingkey_warn"} // 路由
    ))
    public void receive(String sendMsg, Channel channel, Message message) {
        MessageProperties messageProperties = message.getMessageProperties();
        try {
            log.info(" ===== receive deliveryTag is {} ===== ", messageProperties.getDeliveryTag());
            log.info(" ===== receive sendMsg is {} ===== ", sendMsg);
            log.info(" ===== receive msg is {} ===== ", new String(message.getBody(), UTF_8));
            log.info(" ===== receive headers is {} ===== ", messageProperties.getHeaders());
            log.info(" ===== receive receivedExchange is {} ===== ", messageProperties.getReceivedExchange());
            log.info(" ===== receive receivedRoutingKey is {} ===== ", messageProperties.getReceivedRoutingKey());
            log.info(" ===== receive consumerQueue is {} ===== ", messageProperties.getConsumerQueue());

            // 告诉服务器收到这条消息已经被当前消费者消费了，可以在队列安全删除，这样后面就不会再重发了，
            // 否则消息服务器以为这条消息没处理掉，后续还会再发
            // 第二个参数是消息的标识，false只确认当前一个消息收到，true确认所有consumer获得的消息
            channel.basicAck(messageProperties.getDeliveryTag(), false);
        } catch (Exception e) {
            log.error(" receive4 basicAck 异常", e);
            try {
                // 第一个参数依然是当前消息到的数据的唯一id;
                // 第二个参数是指是否针对多条消息；如果是true，也就是说一次性针对当前通道的消息的tagID小于当前这条消息的，都不进行消费。
                // 第三个参数是指是否重新入列，也就是指不确认的消息是否重新丢回到队列里面去。
                // 慎用，考虑不周可能导致循环处理导致消息积压
                channel.basicNack(messageProperties.getDeliveryTag(), false, true);

                // 拒绝消费当前消息，如果第二参数传入true，就是将数据重新丢回队列里，那么下次还会消费这消息。设置false，就是告诉服务器，我已经知道这条消息数据了，因为一些原因拒绝它，而且服务器也把这个消息丢掉就行。 下次不想再消费这条消息了。
                // 慎用，考虑不周可能导致循环处理导致消息积压
                // channel.basicReject(messageProperties.getDeliveryTag(), true);
            } catch (IOException e1) {
                log.error(" receive4 basicNack异常", e1);
            }
        }
    }
    /************ 2.通过配置文件全局设置确认为手动确认 结束 ************/

}
